package org.codehaus.activemq.transport.tcp;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportServerChannelSupport;

public class TcpTransportServerChannel extends TransportServerChannelSupport
  implements Runnable
{
  private static final Log log = LogFactory.getLog(TcpTransportServerChannel.class);
  protected static final int DEFEAULT_BACKLOG = 500;
  private WireFormat wireFormat;
  protected String bindAddressURI;
  private Thread serverSocketThread;
  private ServerSocket serverSocket;
  private SynchronizedBoolean closed;
  private SynchronizedBoolean started;
  private boolean useAsyncSend = false;
  private int maxOutstandingMessages = 10;
  private int backlog = 500;

  public TcpTransportServerChannel(WireFormat wireFormat, URI bindAddr)
    throws JMSException
  {
    this.wireFormat = wireFormat;
    this.bindAddressURI = bindAddr.toString();
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
    try {
      this.serverSocket = createServerSocket(bindAddr);
      log.info("Listening for connections at: " + bindAddr);
    }
    catch (Exception se) {
      System.out.println(se);
      se.printStackTrace();
      JMSException jmsEx = new JMSException("Bind to " + this.bindAddressURI + " failed: " + se.getMessage());
      jmsEx.setLinkedException(se);
      throw jmsEx;
    }
  }

  public TcpTransportServerChannel(WireFormat wireFormat, ServerSocket serverSocket) {
    this.wireFormat = wireFormat;
    this.serverSocket = serverSocket;
    this.closed = new SynchronizedBoolean(false);
    this.started = new SynchronizedBoolean(false);
    this.bindAddressURI = serverSocket.getInetAddress().toString();
    log.info("Listening for connections at: " + this.bindAddressURI);
  }

  public void stop()
  {
    if (this.closed.commit(false, true)) {
      super.stop();
      try {
        if (this.serverSocket != null) {
          this.serverSocket.close();
          this.serverSocketThread.join();
          this.serverSocketThread = null;
        }
      }
      catch (Throwable e) {
        log.warn("Caught while closing: " + e + ". Now Closed", e);
      }
    }
  }

  public void start()
    throws JMSException
  {
    super.start();
    if (this.started.commit(false, true)) {
      this.serverSocketThread = new Thread(this, toString());
      this.serverSocketThread.setDaemon(true);
      this.serverSocketThread.start();
    }
  }

  public String toString()
  {
    return "TcpTransportServerChannel@" + this.bindAddressURI;
  }

  public void run()
  {
    while (!this.closed.get()) {
      Socket socket = null;
      try {
        socket = this.serverSocket.accept();
        if (socket != null)
        {
          PooledExecutor executor = null;
          if (this.useAsyncSend) {
            executor = new PooledExecutor(new BoundedBuffer(this.maxOutstandingMessages), 1);
          }
          TcpTransportChannel channel = new TcpTransportChannel(this.wireFormat, socket, executor);
          addClient(channel);
        }
      }
      catch (Exception e) {
        if (!this.closed.get())
          log.warn("run()", e);
      }
    }
  }

  public boolean isUseAsyncSend()
  {
    return this.useAsyncSend;
  }

  public void setUseAsyncSend(boolean useAsyncSend) {
    this.useAsyncSend = useAsyncSend;
  }

  public int getMaxOutstandingMessages() {
    return this.maxOutstandingMessages;
  }

  public void setMaxOutstandingMessages(int maxOutstandingMessages) {
    this.maxOutstandingMessages = maxOutstandingMessages;
  }

  public int getBacklog() {
    return this.backlog;
  }

  public void setBacklog(int backlog) {
    this.backlog = backlog;
  }

  protected ServerSocket createServerSocket(URI bind)
    throws UnknownHostException, IOException
  {
    ServerSocket answer = null;
    String host = bind.getHost();
    host = (host == null) || (host.length() == 0) ? "localhost" : host;
    InetAddress addr = InetAddress.getByName(host);
    if (addr.equals(InetAddress.getLocalHost())) {
      answer = new ServerSocket(bind.getPort(), this.backlog);
    }
    else {
      answer = new ServerSocket(bind.getPort(), this.backlog, addr);
    }
    return answer;
  }
}